{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#### From DSScratch chapter 24 MapReduce \n",
    "import os\n",
    "os.chdir(\"/home/lab466/pythons/pyDSScratch/code-python3\")\n",
    "import math, random, re, datetime\n",
    "from collections import defaultdict, Counter\n",
    "from functools import partial\n",
    "from naive_bayes import tokenize"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def word_count_old(documents):\n",
    "    \"\"\"word count not using MapReduce\"\"\"\n",
    "    return Counter(word\n",
    "        for document in documents\n",
    "        for word in tokenize(document))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def wc_mapper(document):\n",
    "    \"\"\"for each word in the document, emit (word,1)\"\"\"\n",
    "    for word in tokenize(document):\n",
    "        yield (word, 1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def wc_reducer(word, counts):\n",
    "    \"\"\"sum up the counts for a word\"\"\"\n",
    "    yield (word, sum(counts))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def word_count(documents):\n",
    "    \"\"\"count the words in the input documents using MapReduce\"\"\"\n",
    "\n",
    "    # place to store grouped values\n",
    "    collector = defaultdict(list)\n",
    "\n",
    "    for document in documents:\n",
    "        for word, count in wc_mapper(document):\n",
    "            collector[word].append(count)\n",
    "\n",
    "    return [output\n",
    "            for word, counts in collector.items()\n",
    "            for output in wc_reducer(word, counts)]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def map_reduce(inputs, mapper, reducer):\n",
    "    \"\"\"runs MapReduce on the inputs using mapper and reducer\"\"\"\n",
    "    collector = defaultdict(list)\n",
    "\n",
    "    for input in inputs:\n",
    "        for key, value in mapper(input):\n",
    "            collector[key].append(value)\n",
    "\n",
    "    return [output\n",
    "            for key, values in collector.items()\n",
    "            for output in reducer(key,values)]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def reduce_with(aggregation_fn, key, values):\n",
    "    \"\"\"reduces a key-values pair by applying aggregation_fn to the values\"\"\"\n",
    "    yield (key, aggregation_fn(values))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def values_reducer(aggregation_fn):\n",
    "    \"\"\"turns a function (values -> output) into a reducer\"\"\"\n",
    "    return partial(reduce_with, aggregation_fn)\n",
    "\n",
    "sum_reducer = values_reducer(sum)\n",
    "max_reducer = values_reducer(max)\n",
    "min_reducer = values_reducer(min)\n",
    "count_distinct_reducer = values_reducer(lambda values: len(set(values)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#\n",
    "# Analyzing Status Updates\n",
    "#\n",
    "\n",
    "status_updates = [\n",
    "    {\"id\": 1,\n",
    "     \"username\" : \"joelgrus\",\n",
    "     \"text\" : \"Is anyone interested in a data science book?\",\n",
    "     \"created_at\" : datetime.datetime(2013, 12, 21, 11, 47, 0),\n",
    "     \"liked_by\" : [\"data_guy\", \"data_gal\", \"bill\"] },\n",
    "    # add your own\n",
    "]\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def data_science_day_mapper(status_update):\n",
    "    \"\"\"yields (day_of_week, 1) if status_update contains \"data science\" \"\"\"\n",
    "    if \"data science\" in status_update[\"text\"].lower():\n",
    "        day_of_week = status_update[\"created_at\"].weekday()\n",
    "        yield (day_of_week, 1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "data_science_days = map_reduce(status_updates,\n",
    "                               data_science_day_mapper,\n",
    "                               sum_reducer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def words_per_user_mapper(status_update):\n",
    "    user = status_update[\"username\"]\n",
    "    for word in tokenize(status_update[\"text\"]):\n",
    "        yield (user, (word, 1))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def most_popular_word_reducer(user, words_and_counts):\n",
    "    \"\"\"given a sequence of (word, count) pairs,\n",
    "    return the word with the highest total count\"\"\"\n",
    "\n",
    "    word_counts = Counter()\n",
    "    for word, count in words_and_counts:\n",
    "        word_counts[word] += count\n",
    "\n",
    "    word, count = word_counts.most_common(1)[0]\n",
    "\n",
    "    yield (user, (word, count))\n",
    "\n",
    "user_words = map_reduce(status_updates,\n",
    "                        words_per_user_mapper,\n",
    "                        most_popular_word_reducer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def liker_mapper(status_update):\n",
    "    user = status_update[\"username\"]\n",
    "    for liker in status_update[\"liked_by\"]:\n",
    "        yield (user, liker)\n",
    "\n",
    "distinct_likers_per_user = map_reduce(status_updates,\n",
    "                                      liker_mapper,\n",
    "                                      count_distinct_reducer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#\n",
    "# matrix multiplication\n",
    "#\n",
    "\n",
    "def matrix_multiply_mapper(m, element):\n",
    "    \"\"\"m is the common dimension (columns of A, rows of B)\n",
    "    element is a tuple (matrix_name, i, j, value)\"\"\"\n",
    "    matrix, i, j, value = element\n",
    "\n",
    "    if matrix == \"A\":\n",
    "        for column in range(m):\n",
    "            # A_ij is the jth entry in the sum for each C_i_column\n",
    "            yield((i, column), (j, value))\n",
    "    else:\n",
    "        for row in range(m):\n",
    "            # B_ij is the ith entry in the sum for each C_row_j\n",
    "            yield((row, j), (i, value))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def matrix_multiply_reducer(m, key, indexed_values):\n",
    "    results_by_index = defaultdict(list)\n",
    "    for index, value in indexed_values:\n",
    "        results_by_index[index].append(value)\n",
    "\n",
    "    # sum up all the products of the positions with two results\n",
    "    sum_product = sum(results[0] * results[1]\n",
    "                      for results in results_by_index.values()\n",
    "                      if len(results) == 2)\n",
    "\n",
    "    if sum_product != 0.0:\n",
    "        yield (key, sum_product)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "wc_mapper results\n",
      "[('data', 1), ('science', 1), ('data', 1), ('big', 1), ('science', 1), ('fiction', 1)]\n",
      "\n",
      "word count results\n",
      "[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]\n",
      "\n",
      "word count using map_reduce function\n",
      "[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]\n",
      "\n",
      "data science days\n",
      "[(5, 1)]\n",
      "\n",
      "user words\n",
      "[('joelgrus', ('data', 1))]\n",
      "\n",
      "distinct likers\n",
      "[('joelgrus', 3)]\n",
      "\n",
      "map-reduce matrix multiplication\n",
      "entries: [('A', 0, 0, 3), ('A', 0, 1, 2), ('B', 0, 0, 4), ('B', 0, 1, -1), ('B', 1, 0, 10)]\n",
      "result: [((0, 0), 32), ((0, 1), -3)]\n"
     ]
    }
   ],
   "source": [
    "if __name__ == \"__main__\":\n",
    "\n",
    "    documents = [\"data science\", \"big data\", \"science fiction\"]\n",
    "\n",
    "    wc_mapper_results = [result\n",
    "                         for document in documents\n",
    "                         for result in wc_mapper(document)]\n",
    "\n",
    "    print(\"wc_mapper results\")\n",
    "    print(wc_mapper_results)\n",
    "    print()\n",
    "\n",
    "    print(\"word count results\")\n",
    "    print(word_count(documents))\n",
    "    print()\n",
    "\n",
    "    print(\"word count using map_reduce function\")\n",
    "    print(map_reduce(documents, wc_mapper, wc_reducer))\n",
    "    print()\n",
    "\n",
    "    print(\"data science days\")\n",
    "    print(data_science_days)\n",
    "    print()\n",
    "\n",
    "    print(\"user words\")\n",
    "    print(user_words)\n",
    "    print()\n",
    "\n",
    "    print(\"distinct likers\")\n",
    "    print(distinct_likers_per_user)\n",
    "    print()\n",
    "\n",
    "    # matrix multiplication\n",
    "\n",
    "    entries = [(\"A\", 0, 0, 3), (\"A\", 0, 1,  2),\n",
    "           (\"B\", 0, 0, 4), (\"B\", 0, 1, -1), (\"B\", 1, 0, 10)]\n",
    "    mapper = partial(matrix_multiply_mapper, 3)\n",
    "    reducer = partial(matrix_multiply_reducer, 3)\n",
    "\n",
    "    print(\"map-reduce matrix multiplication\")\n",
    "    print(\"entries:\", entries)\n",
    "    print(\"result:\", map_reduce(entries, mapper, reducer))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
